# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base interface for fast Fourier Transforms.
:class:`~hysop.numerics.fft.FFTI`
:class:`~hysop.numerics.fft.FFTPlanI`
:class:`~hysop.numerics.fft.FFTQueueI`
:class:`~hysop.numerics.fft.HysopFFTWarning`
:class:`~hysop.numerics.fft.HysopFFTDataLayoutError`
Methods defined here:
simd_alignment, is_byte_aligned, mk_shape, mk_view
"""
from abc import ABCMeta, abstractmethod
import warnings
import functools
import numpy as np
from pyfftw import simd_alignment, is_byte_aligned as is_simd_byte_aligned
from hysop import vprint, __VERBOSE__
from hysop.constants import Backend, TransformType
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.numerics import float_to_complex_dtype, complex_to_float_dtype
from hysop.tools.units import bytes2str
from hysop.tools.warning import HysopWarning
from hysop.tools.spectral_utils import SpectralTransformUtils as STU
from hysop.core.arrays.array import Array
from hysop.core.arrays.array_backend import ArrayBackend
from hysop.testsenv import __HAS_OPENCL_BACKEND__
[docs]
def is_byte_aligned(array):
from hysop.backend.host.host_array import HostArray
if __HAS_OPENCL_BACKEND__:
from hysop.backend.device.opencl.opencl_array import OpenClArray, clArray
if isinstance(array, (HostArray, np.ndarray)):
return is_simd_byte_aligned(array)
elif __HAS_OPENCL_BACKEND__ and isinstance(array, (OpenClArray, clArray.Array)):
return array.offset == 0
else:
msg = f"Unknown array type {type(array)}."
raise TypeError(msg)
[docs]
def mk_shape(base_shape, axis, N):
"""
Utility function to create a modified shape from base_shape on
a specified axis.
"""
shape = list(base_shape)
shape[axis] = N
return tuple(shape)
[docs]
def mk_view(ndim, axis, *args, **kwds):
"""
Utility function to create a view on a n-dimensional array.
Returns a tuple containing default axe view (ellipsis)
on all axis but the one specified by 'axis' which is replaced
by slice(*args) if len(args)>1 else args[0].
"""
default = kwds.pop("default", slice(None, None, None))
assert args, "Need at least one arg !"
assert not kwds, f"Unknown keyword arguments: {kwds.keys()}"
view = [default] * ndim
if len(args) == 1:
view[axis] = args[0]
else:
view[axis] = slice(*args)
return tuple(view)
[docs]
class HysopFFTWarning(HysopWarning):
"""
Specific tag to issue FFT related warnings.
"""
pass
[docs]
class HysopFFTDataLayoutError(ValueError):
"""
Specific error to raise for incompatible strides.
"""
pass
[docs]
class FFTQueueI:
"""Command queue like objects to define n-dimensional transforms."""
[docs]
@abstractmethod
def execute(self, wait_for=None):
"""Execute all planned plans."""
pass
[docs]
@abstractmethod
def __iadd__(self, *plans):
"""Add a plan to the queue."""
pass
[docs]
def __call__(self, **kwds):
"""Alias for execute."""
return self.execute(**kwds)
[docs]
class FFTPlanI(metaclass=ABCMeta):
"""
Common inteface for FFT plans.
Basically just a functor that holds relevant data
to execute a preconfigurated FFT-like tranform.
"""
def __init__(self, verbose=__VERBOSE__):
self.verbose = verbose
self._setup = False
self._allocated = False
[docs]
@abstractmethod
def output_array(self):
"""
Return currently planned output array.
"""
pass
[docs]
def setup(self, queue=None):
"""
Method that has to be called before any call to execute.
"""
if self._setup:
msg = "Plan was already setup..."
raise RuntimeError(msg)
self._setup = True
return self
@property
def required_buffer_size(self):
"""
Return the required temporary buffer size in bytes to
compute the transform.
"""
assert self._setup
return 0
[docs]
def allocate(self, buf=None):
"""Provide or allocate required temporary buffer."""
assert self._setup
assert not self._allocated
self._allocated = True
[docs]
@abstractmethod
def execute(self):
"""
Execute the FFT plan on current input and output array.
"""
pass
[docs]
def __call__(self, a=None, out=None, **kwds):
"""
Apply the FFT plan to possibly new input or output arrays.
"""
if (a is not None) or (out is not None):
msg = "New array execute is not available for FFT backend {}."
msg = msg.format(type(self).__name__)
raise RuntimeError(msg)
self.execute(**kwds)
[docs]
class FFTI(metaclass=ABCMeta):
"""
Interface to compute local to process FFT-like transforms.
Common inteface for all array backends, based on the numpy.fft interface.
Standard FFTs: complex to complex (C2C)
fft() Compute the 1-dimensional discrete Fourier Transform.
ifft() Compute the 1-dimensional inverse discrete Fourier Transform.
Real data FFTS: real to complex (R2C) and complex to real (C2R)
rfft() Compute the 1-dimensional discrete Fourier Transform for real input.
irfft() Compute the inverse of the 1-dimensional FFT of real input.
Real FFTS: real to real (R2R)
dct() Compute one of the discrete cosine transforms of a real input.
dst() Compute one of the discrete sine transforms of a real input.
Supported R2R transforms are at least:
DCT-I, DCT-II, DCT-III
DST-I, DST-II, DST-III
Other R2R transforms:
DCT-IV and DCT-IV are only supported by the FFTW backend at this time.
DCT-V to DCT-VIII and DST-V to DST-VII are not supported by any FFT backend.
About floating point precision:
By default, both simple and double precision are supported.
numpy only supports double precision (simple precision is supported by casting).
FFTW also supports long double precision.
Normalization:
The default normalization has the direct transforms unscaled and the inverse transform
is scaled by 1/N where N is the logical size of the transform.
N should not to be confused with the physical size of the input arrays n:
FFT, RFFT: N = n
DCT-II, DCT-III, DCT-IV: N = 2*n
DST-II, DST-III, DST-IV: N = 2*n
DCT-I: N = 2*(n-1)
DST-I: N = 2*(n+1)
Orthogonal normalization is not supported by default, however the user
may specify its custom normalization for each transform via the 'scaling'
keyword parameter.
Inverse transforms (up to scaling):
Just add i in front of the method name to get the inverse transform with good scaling.
For a given transform T, iT(T(X)) should always yield X within machine accuracy.
Underlying inverse transform mapping is:
FFT: IFFT
RFFT: IRFFT
DCT-I: DCT-I
DCT-II: DCT-III
DCT-III: DCT-II
DCT-IV: DCT-IV
DST-I: DST-I
DST-II: DST-III
DST-III: DST-II
DST-IV: DST-IV
Other methods that this interface defines:
*Create queue
*Transpose
*Copy
*Zero fill
Those methods will be used by the n-dimensional planner.
"""
__transform2fn = {
TransformType.FFT: ("fft", {}),
TransformType.IFFT: ("ifft", {}),
TransformType.RFFT: ("rfft", {}),
TransformType.IRFFT: ("irfft", {}),
TransformType.DCT_I: ("dct", {"type": 1}),
TransformType.DCT_II: ("dct", {"type": 2}),
TransformType.DCT_III: ("dct", {"type": 3}),
TransformType.DCT_IV: ("dct", {"type": 4}),
TransformType.IDCT_I: ("idct", {"type": 1}),
TransformType.IDCT_II: ("idct", {"type": 2}),
TransformType.IDCT_III: ("idct", {"type": 3}),
TransformType.IDCT_IV: ("idct", {"type": 4}),
TransformType.DST_I: ("dst", {"type": 1}),
TransformType.DST_II: ("dst", {"type": 2}),
TransformType.DST_III: ("dst", {"type": 3}),
TransformType.DST_IV: ("dst", {"type": 4}),
TransformType.IDST_I: ("idst", {"type": 1}),
TransformType.IDST_II: ("idst", {"type": 2}),
TransformType.IDST_III: ("idst", {"type": 3}),
TransformType.IDST_IV: ("idst", {"type": 4}),
}
[docs]
@classmethod
def default_interface_from_backend(
cls, backend, enable_opencl_host_buffer_mapping, **kwds
):
check_instance(backend, ArrayBackend)
if backend.kind is Backend.HOST:
from hysop.numerics.fft.host_fft import HostFFTI
assert not enable_opencl_host_buffer_mapping
return HostFFTI.default_interface(**kwds)
elif backend.kind is Backend.OPENCL:
if enable_opencl_host_buffer_mapping:
from hysop.numerics.fft.host_fft import HostFFTI
return HostFFTI.default_interface(
backend=backend.host_array_backend, **kwds
)
else:
from hysop.numerics.fft.opencl_fft import OpenClFFTI
return OpenClFFTI.default_interface(cl_env=backend.cl_env, **kwds)
else:
msg = f"Unknown backend kind {backend.kind}."
[docs]
def check_backend(self, backend, enable_opencl_host_buffer_mapping):
check_instance(backend, ArrayBackend)
if enable_opencl_host_buffer_mapping:
if self.backend is not backend.host_array_backend:
msg = "Host array backend mismatch {} vs {}."
msg = msg.format(self.backend, backend)
raise RuntimeError(msg)
else:
if self.backend is not backend:
msg = "Backend mismatch {} vs {}."
msg = msg.format(self.backend, backend)
raise RuntimeError(msg)
def __init__(self, backend, warn_on_allocation=True, error_on_allocation=False):
"""Initializes the interface and default supported real and complex types."""
from hysop.core.arrays.array_backend import ArrayBackend
check_instance(backend, ArrayBackend)
check_instance(warn_on_allocation, bool)
check_instance(error_on_allocation, bool)
self.supported_ftypes = (np.float32, np.float64)
self.supported_ctypes = (np.complex64, np.complex128)
self.supported_cosine_transforms = (1, 2, 3)
self.supported_sine_transforms = (1, 2, 3)
self.backend = backend
self.warn_on_allocation = warn_on_allocation
self.error_on_allocation = error_on_allocation
[docs]
def allocate_output(self, out, shape, dtype):
"""Alocate output if required and check shape and dtype."""
if out is None:
if self.warn_on_allocation or self.error_on_allocation:
nbytes = np.prod(shape, dtype=np.int64) * dtype.itemsize
msg = "FftwFFT: allocating aligned output array of size {}."
msg = msg.format(bytes2str(nbytes))
if self.error_on_allocation:
raise RuntimeError(msg)
else:
warnings.warn(msg, HysopFFTWarning)
out = self.backend.empty(shape=shape, dtype=dtype)
else:
assert out.dtype == dtype
assert out.shape == shape
return out
[docs]
@classmethod
def default_interface(cls, **kwds):
"""Get the default FFT interface."""
msg = "{}.default_interface() has not been implemented yet !"
msg = msg.format(cls.__name__)
raise NotImplementedError(msg)
[docs]
def allocate_plans(self, op, plans, tmp_buffer=None):
"""Allocate and share a buffer on given backend to a group of plans."""
backend = self.backend
tmp_size = max(plan.required_buffer_size for plan in plans)
if tmp_size > 0:
msg = "Operator {}: Allocating an additional {} temporary buffer for FFT backend {}."
msg = msg.format(
op.pretty_name, bytes2str(tmp_size), self.__class__.__name__
)
if tmp_buffer is not None:
assert tmp_buffer.nbytes >= tmp_size
else:
if self.error_on_allocation:
raise RuntimeError(msg)
elif self.warn_on_allocation:
from .gpyfft_fft import HysopGpyFftWarning
warnings.warn(msg, HysopGpyFftWarning)
else:
vprint(msg)
tmp_buffer = backend.empty(shape=(tmp_size), dtype=np.uint8)
for plan in plans:
if plan.required_buffer_size > tmp_buffer.nbytes:
msg = (
"\nFATAL ERROR: Failed to allocate temporary buffer for clFFT."
)
msg += "\n => clFFT expected {} bytes but only {} bytes have been "
msg += "allocated.\n"
msg = msg.format(plan.required_buffer_size, tmp_buffer.nbytes)
raise RuntimeError(msg)
elif plan.required_buffer_size > 0:
buf = tmp_buffer[: plan.required_buffer_size]
plan.allocate(buf=buf)
else:
plan.allocate()
else:
for plan in plans:
assert plan.required_buffer_size == 0
plan.allocate()
tmp_buffer = None
return tmp_buffer
[docs]
@abstractmethod
def fft(self, a, out, axis=-1, **kwds):
"""
Compute the unscaled one-dimensional complex to complex discrete Fourier Transform.
Parameters
----------
a: array_like of np.complex64 or np.complex128
Complex input array.
out: array_like of np.complex64 or np.complex128
Complex output array of the same shape and dtype as the input.
axis: int, optional
Axis over witch to compute the FFT.
Defaults to last axis.
Returns
-------
(shape, dtype) of the output array determined from the input array.
Notes
-----
N = a.shape[axis]
out[0] will contain the sum of the signal (zero-frequency term always real for
real inputs).
If N is even:
out[1:N/2] contains the positive frequency terms.
out[N/2] contains the Nyquist frequency (always real for real inputs).
out[N/2+1:] contains the negative frequency terms.
Else if N is odd:
out[1:(N-1)/2] contains the positive frequency terms.
out[(N-1)/2:] contains the negative frequency terms.
"""
assert a.dtype in self.supported_ctypes, a.dtype
if out is not None:
assert a.dtype == out.dtype
assert np.array_equal(a.shape, out.shape)
return (a.shape, a.dtype)
[docs]
@abstractmethod
def ifft(self, a, out, axis=-1, **kwds):
"""
Compute the one-dimensional complex to complex discrete Fourier Transform scaled by 1/N.
Parameters
----------
a: array_like of np.complex64 or np.complex128
Complex input array.
out: array_like of np.complex64 or np.complex128
Complex output array of the same shape and dtype as the input.
axis: int, optional
Axis over witch to compute the FFT.
Defaults to last axis.
Returns
-------
(shape, dtype, logical_size) of the output array determined from the input array.
"""
assert a.dtype in self.supported_ctypes, a.dtype
if out is not None:
assert a.dtype == out.dtype
assert np.array_equal(a.shape, out.shape)
return (a.shape, a.dtype, a.shape[axis])
[docs]
@abstractmethod
def rfft(self, a, out, axis=-1, **kwds):
"""
Compute the unscaled one-dimensional real to hermitian complex discrete Fourier
Transform.
Parameters
----------
a: array_like of np.float32 or np.float64
Real input array.
out: array_like of np.complex64 or np.complex128
Complex output array of matching complex dtype.
out.shape[...] = a.shape[...]
out.shape[axis] = a.shape[axis]//2 + 1
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
Returns
-------
(shape, dtype) of the output array determined from the input array.
Notes
-----
For real inputs there is no information in the negative frequency components that
is not already available from the positive frequency component because of the
Hermitian symmetry.
N = out.shape[axis] = a.shape[axis]//2 + 1
out[0] will contain the sum of the signal (zero-frequency term, always real).
If N is even:
out[1:N/2] contains the positive frequency terms.
out[N/2] contains the Nyquist frequency (always real).
Else if N is odd:
out[1:(N+1)/2] contains the positive frequency terms.
"""
assert a.dtype in self.supported_ftypes
ctype = float_to_complex_dtype(a.dtype)
cshape = list(a.shape)
cshape[axis] = cshape[axis] // 2 + 1
cshape = tuple(cshape)
if out is not None:
assert out.dtype in self.supported_ctypes
assert ctype == out.dtype
assert np.array_equal(out.shape, cshape)
return (cshape, ctype)
[docs]
@abstractmethod
def irfft(self, a, out, n=None, axis=-1, **kwds):
"""
Compute the one-dimensional hermitian complex to real discrete Fourier Transform
scaled by 1/N.
Parameters
----------
a: array_like of np.complex64 or np.complex128
Complex input array.
out: array_like of np.float32 or np.float64
Real output array of matching real type.
out.shape[...] = a.shape[...]
Last axis should match forward transform size used:
1) out.shape[axis] = 2*(a.shape[axis]-1)
2) out.shape[axis] = 2*(a.shape[axis]-1) + 1
n: int, optional
Length of the transformed axis of the output.
ie: n should be in [2*(a.shape[axis]-1), 2*(a.shape[axis]-1)+1]
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
Notes
-----
To get an odd number of output points, n or out must be specified.
Returns
-------
(shape, dtype, logical_size) of the output array determined from the input array,
out and n.
"""
assert a.dtype in self.supported_ctypes
cshape = a.shape
rtype = complex_to_float_dtype(a.dtype)
rshape_even, rshape_odd = list(a.shape), list(a.shape)
rshape_even[axis] = 2 * (cshape[axis] - 1)
rshape_odd[axis] = 2 * (cshape[axis] - 1) + 1
if out is not None:
assert out.dtype in self.supported_ftypes
assert rtype == out.dtype
ns = out.shape[axis]
if n is not None:
assert (
ns == n
), "output shape mismatch with specified transformed output size."
else:
n = ns
if n % 2 == 0:
assert np.array_equal(out.shape, rshape_even)
else:
assert np.array_equal(out.shape, rshape_odd)
if (n is None) or (n % 2 == 0):
rshape = rshape_even
n = rshape[axis]
else:
rshape = rshape_odd
rshape = tuple(rshape)
logical_size = n
assert rshape[axis] == logical_size
return (rshape, rtype, logical_size)
[docs]
@abstractmethod
def dct(self, a, out=None, type=2, axis=-1, **kwds):
"""
Compute the one-dimensional Cosine Transform of specified type.
Parameters
----------
a: array_like
Real input array.
out: array_like
Real output array of matching input type and shape.
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
Returns
-------
(shape, dtype) of the output array determined from the input array.
"""
assert a.dtype in self.supported_ftypes, a.dtype
assert (
type in self.supported_cosine_transforms
), self.supported_cosine_transforms
if out is not None:
assert a.dtype == out.dtype
assert np.array_equal(a.shape, out.shape)
return (a.shape, a.dtype)
[docs]
@abstractmethod
def idct(self, a, out=None, type=2, axis=-1, **kwds):
"""
Compute the one-dimensional Inverse Cosine Transform of specified type.
Default scaling is 1/(2*N) for IDCT type (2,3,4) and
1/(2*N-2) for IDCT type 1.
Parameters
----------
a: array_like
Real input array.
out: array_like
Real output array of matching input type and shape.
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
Returns
-------
(shape, dtype, inverse_type, logical_size) of the output array determined from the input
array.
"""
itype = [1, 3, 2, 4][type - 1]
n = a.shape[axis]
N = 2 * (n - (itype == 1))
logical_size = N
assert a.dtype in self.supported_ftypes, a.dtype
assert (
itype in self.supported_cosine_transforms
), self.supported_cosine_transforms
if out is not None:
assert a.dtype == out.dtype
assert np.array_equal(a.shape, out.shape)
return (a.shape, a.dtype, itype, logical_size)
[docs]
@abstractmethod
def dst(self, a, out=None, type=2, axis=-1, **kwds):
"""
Compute the one-dimensional Sine Transform of specified type.
Parameters
----------
a: array_like
Real input array.
out: array_like
Real output array of matching input type and shape.
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
Returns
-------
(shape, dtype) of the output array determined from the input array.
"""
assert a.dtype in self.supported_ftypes, a.dtype
assert type in self.supported_sine_transforms, self.supported_sine_transforms
if out is not None:
assert a.dtype == out.dtype
assert np.array_equal(a.shape, out.shape)
return (a.shape, a.dtype)
[docs]
@abstractmethod
def idst(self, a, out=None, type=2, axis=-1, **kwds):
"""
Compute the one-dimensional Inverse Sine Transform of specified type.
Default scaling is 1/(2*N) for IDST type (2,3,4) and
1/(2*N+2) for IDST type 1.
Parameters
----------
a: array_like
Real input array.
out: array_like
Real output array of matching input type and shape.
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
Returns
-------
(shape, dtype, inverse_type, logical_size) of the output array determined from the input
array.
"""
itype = [1, 3, 2, 4][type - 1]
n = a.shape[axis]
N = 2 * (n + (itype == 1))
logical_size = N
assert a.dtype in self.supported_ftypes, a.dtype
assert type in self.supported_sine_transforms, self.supported_sine_transforms
if out is not None:
assert a.dtype == out.dtype
assert np.array_equal(a.shape, out.shape)
return (a.shape, a.dtype, itype, logical_size)
[docs]
@abstractmethod
def new_queue(self, tg, name):
"""Return a FFTQueue object valid with this backend."""
pass
[docs]
@abstractmethod
def plan_copy(self, tg, src, dst):
"""Plan a copy from src to dst."""
pass
[docs]
@abstractmethod
def plan_accumulate(self, tg, src, dst):
"""Plan an accumulation from src into dst."""
pass
[docs]
@abstractmethod
def plan_transpose(self, tg, src, dst, axes):
"""Plan a transpose from src to dst using given axes."""
pass
[docs]
@abstractmethod
def plan_fill_zeros(self, tg, a, slices):
"""Plan to fill every input slices of input array a with zeroes."""
pass
[docs]
@abstractmethod
def plan_compute_energy(self, tg, fshape, src, dst, transforms, mutexes=None):
"""Plan to compute energy from src to energy."""
assert src.ndim == len(transforms)
assert dst.ndim == 1
N = tuple(int(_) for _ in src.shape)
K2 = ()
NS2 = ()
C2C = ()
R2C = 0
S = 1.0
assert len(fshape) == len(N) == len(transforms)
for Fi, Ni, Ti in zip(fshape, N, transforms):
c2c = int(STU.is_C2C(Ti))
r2c = int(STU.is_R2C(Ti) or STU.is_C2R(Ti))
Ki = Ni // 2 if c2c else Ni - 1
if r2c:
Si = Fi / 2.0
else:
Si = Fi
S *= Si
K2 += (Ki**2,)
NS2 += ((Ni + 1) // 2,)
C2C += (c2c,)
R2C |= r2c
S = 1 / S
# for C2C we need to check j = (i<(N+1)//2 ? i : N-i)
max_wavenumber = int(round(sum(K2) ** 0.5, 0))
msg = f"Destination buffer should have size {max_wavenumber+1} but has size {dst.size}."
assert dst.size == max_wavenumber + 1, msg
return (N, NS2, C2C, R2C, S)